package com.flink_demo.demo.eventtime.table;

import java.util.Iterator;
import java.util.Properties;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;

public class KafkaWindowProcessTimeSql {
	public static final String ZOOKEEPER_HOST = "192.168.2.200:2181";
	public static final String KAFKA_BROKER = "192.168.2.200:9092";
	public static final String TRANSACTION_GROUP = "kafka_demo";

	public static void main(String... args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		// 建立tableEnv
		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
		env.getConfig().setAutoWatermarkInterval(10000l);
		env.enableCheckpointing(500);
		env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
		env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
		// configure Kafka consumer
		Properties kafkaProps = new Properties();
		kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
		kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER);
		kafkaProps.setProperty("group.id", TRANSACTION_GROUP);

		// topicd的名字是event_time，schema默认使用SimpleStringSchema()即可
		DataStream<Tuple3<String, Long, String>> transaction = env
				.addSource(new FlinkKafkaConsumer010<String>("event_time", new SimpleStringSchema(), kafkaProps))
				.map(new MapFunction<String, Tuple3<String, Long, String>>() {

					/**
					 * 拆分split
					 */
					private static final long serialVersionUID = 1L;

					@Override
					public Tuple3<String, Long, String> map(String value) throws Exception {
						String val[] = value.split(",", -1);
						return new Tuple3<String, Long, String>(val[0], Long.valueOf(val[1]), val[2]);
					}
				});
		// transaction.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
		SingleOutputStreamOperator<Tuple3<String, Long, String>> ws = transaction
				.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
				.apply(new AllWindowFunction<Tuple3<String, Long, String>, Tuple3<String, Long, String>, TimeWindow>() {
					private static final long serialVersionUID = 1L;

					@Override
					public void apply(TimeWindow window, Iterable<Tuple3<String, Long, String>> values,
							Collector<Tuple3<String, Long, String>> out) throws Exception {
						Iterator<Tuple3<String, Long, String>> it = values.iterator();
						String user = "";
						long score = 0;
						String pdate = "";
						while (it.hasNext()) {
							Tuple3<String, Long, String> tuple = it.next();
							user = tuple.f0;
							score += tuple.f1;
							pdate = tuple.f2;
						}
						out.collect(new Tuple3<String, Long, String>(user, score, pdate));
					}
				});
		tableEnv.registerDataStream("usertable", ws, "user, score, qdate");
		Table table = tableEnv.sql("SELECT user, SUM(score) as score FROM usertable GROUP BY user");
		DataStream<Tuple2<Boolean, User>> result = tableEnv.toRetractStream(table, User.class);
		result.print();
		env.execute();
	}

	public static class User {
		public String user;
		public long score;

		// public constructor to make it a Flink POJO
		public User() {

		}

		public User(String user, long score) {
			this.user = user;
			this.score = score;
		}

		@Override
		public String toString() {
			return "user " + user + " " + score;
		}
	}

}
